package cn.dglydrpy.study.j2ee.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import cn.dglydrpy.study.j2ee.kafka.partitioner.MyKafkaPartitioner;

import java.util.HashMap;
import java.util.Map;

public class MyKafkaPartitionerProducer {
    public static void main(String[] args) {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.131:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 指定自定义的分区器
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        // 此处不要设置partition的值
        // 需先创建tp_part主题
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                "tp_part",
                "mykey",
                "myvalue"
        );

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.out.println("消息发送失败");
                    exception.printStackTrace();
                } else {
                    System.out.println(metadata.topic());
                    System.out.println(metadata.partition());
                    System.out.println(metadata.offset());
                }
            }
        });

        // 关闭生产者
        producer.close();

    }
}
